package com.wdl.webserver.api;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.MediaType;

import org.apache.http.HttpHost;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;

import com.wdl.webserver.Utils;
import com.wdl.webserver.ZuulFilters;
import com.wdl.webserver.data.AlarmType;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;

@RestController
@RequestMapping("/api/elastic")
@Api(value="elastic", description="Elasticsearch data access")
public class ElasticService {
    private static final Logger LOG = LoggerFactory.getLogger(ElasticService.class);

    private static final String INDEX_PREFIX = "hst.hist.";

    private static final String RAISEDTIME = "raisedTime";
    private static final String ACKEDTIME = "ackedTime";
    private static final String TYPE = "type";
    private static final String PERSON_ID = "personId";
    private static final String VALUE = "value";
    private static final String SLEEPHEAVY = "sleepHeavy";
    private static final String SLEEPLIGHT = "sleepLight";

    @Autowired
    public RestTemplate restTemplate;

    @Value("${zuul.routes.backend.url}")
    String datarest;

    private String getIndex(long personId, long startTime, long endTime) {
        Calendar cal = Calendar.getInstance();
        cal.setTimeInMillis(startTime * 1000);
        int startYear = cal.get(Calendar.YEAR);
        int startMonth = cal.get(Calendar.MONTH) + 1;

        cal.setTimeInMillis(endTime * 1000);
        int endYear = cal.get(Calendar.YEAR);
        int endMonth = cal.get(Calendar.MONTH) + 1;

        String index = INDEX_PREFIX + String.valueOf(personId % 10) + ".";
        if (startYear == endYear) {
            index += String.valueOf(startYear) + ".";
            if (startMonth == endMonth) {
                return index + String.valueOf(startMonth);
            } else {
                return index + "*";
            }
        } else {
            return index + "*";
        }
    }

    @GetMapping(value = "/typeInRange", produces = MediaType.APPLICATION_JSON)
    @ApiOperation(value = "Get specific alarm data for a specified person given the time range in seconds.")
    public List<Object> getTypeInRange(
            @ApiParam("Person Id") @RequestParam("personId") long personId,
            @ApiParam("startTime") @RequestParam("startTime") long startTime,
            @ApiParam("endTime") @RequestParam("endTime") long endTime,
            @ApiParam("type") @RequestParam("type") long type,
            HttpServletRequest request) {
        LOG.info(Utils.getLogPrefix(request) + "started");

        ZuulFilters.validatePersonId(request, personId, "/api/elastic/typeInRange");

        List<Object> result = new ArrayList<Object>();

        RestHighLevelClient client = this.getElasticClient();
        if (client == null) {
            LOG.error("Failed to get elasticsearch client.");
            return result;
        }

        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.from(0);
        sourceBuilder.size(10000);
        sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));

        QueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(RAISEDTIME).from(startTime).includeLower(true).to(endTime).includeUpper(true);
        QueryBuilder termQueryBuilderType = QueryBuilders.termQuery(TYPE, type);
        QueryBuilder termQueryBuilderPersonId = QueryBuilders.termQuery(PERSON_ID, personId);
        QueryBuilder boolQueryBuilder = QueryBuilders.boolQuery().must(termQueryBuilderPersonId).must(termQueryBuilderType).must(rangeQueryBuilder);
        sourceBuilder.query(boolQueryBuilder);

        sourceBuilder.sort(new FieldSortBuilder(RAISEDTIME).order(SortOrder.ASC));

        String[] includeFields = new String[] {TYPE, RAISEDTIME, ACKEDTIME, VALUE};
        String[] excludeFields = new String[] {"_source", "_type"};
        sourceBuilder.fetchSource(includeFields, excludeFields);

        SearchRequest searchRequest = new SearchRequest(getIndex(personId, startTime, endTime));
        searchRequest.source(sourceBuilder);

        try {
            SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
            SearchHits hits = response.getHits();
            for (SearchHit hit : hits.getHits()) {
                Map<String, Object> source = hit.getSourceAsMap();
                List<Object> values = new ArrayList<Object>();

                // 1. rasiedTime
                long raisedTime = Long.valueOf(String.valueOf(source.get(RAISEDTIME))) * 1000;
                values.add(raisedTime);

                // 2. value
                Object valueObj = source.get(VALUE);
                if (valueObj == null) {
                    // no value field, set it to 1 by default!
                    values.add(1);
                } else {
                    values.add(valueObj);
                }

                // 3. acked Time
                Object ackedTimeObj = source.get(ACKEDTIME);
                if (ackedTimeObj != null) {
                    long ackedTime = Long.valueOf(String.valueOf(ackedTimeObj)) * 1000;
                    values.add(ackedTime);
                }

                result.add(values);
            }
        } catch (Exception e) {
            LOG.error("Failed to get values from elasticsearch, exception: " + e.getMessage());
        } finally {
            try { client.close(); } catch (IOException e) { }
            LOG.info(Utils.getLogPrefix(request) + "ended.");
        }

        return result;
    }

    /**
     * This one is used by APP to show the moves per hour in the last 24 hours.
     */
    @GetMapping(value = "/moves", produces = MediaType.APPLICATION_JSON)
    @ApiOperation(value = "Get move data for a specified person within 24 hours from elasticsearch, also append the data for the current hour.")
    public List<Object> getMoves(@ApiParam("Person Id") @RequestParam("personId") long personId, HttpServletRequest request) {
        LOG.info(Utils.getLogPrefix(request) + "started");

        ZuulFilters.validatePersonId(request, personId, "/api/elastic/moves");

        List<Object> result = new ArrayList<Object>();

        RestHighLevelClient client = this.getElasticClient();
        if (client == null) {
            LOG.error("Failed to get elasticsearch client.");
            return result;
        }

        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.from(0);
        sourceBuilder.size(10000);
        sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));

        long startTime = Math.round((System.currentTimeMillis() - 24 * 60 * 60 * 1000) / 1000);
        QueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(RAISEDTIME).from(startTime).includeLower(true);
        QueryBuilder termQueryBuilderType = QueryBuilders.termQuery(TYPE, AlarmType.HIST_MOVE.ordinal());
        QueryBuilder termQueryBuilderPersonId = QueryBuilders.termQuery(PERSON_ID, personId);
        QueryBuilder boolQueryBuilder = QueryBuilders.boolQuery().must(termQueryBuilderPersonId).must(termQueryBuilderType).must(rangeQueryBuilder);
        sourceBuilder.query(boolQueryBuilder);

        sourceBuilder.sort(new FieldSortBuilder(RAISEDTIME).order(SortOrder.ASC));

        String[] includeFields = new String[] {TYPE, RAISEDTIME, VALUE};
        String[] excludeFields = new String[] {"_source", "_type"};
        sourceBuilder.fetchSource(includeFields, excludeFields);

        SearchRequest searchRequest = new SearchRequest(getIndex(personId, startTime, Math.round(System.currentTimeMillis() / 1000)));
        searchRequest.source(sourceBuilder);

        try {
            SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
            SearchHits hits = response.getHits();
            for (SearchHit hit : hits.getHits()) {
                Map<String, Object> source = hit.getSourceAsMap();
                List<Object> values = new ArrayList<Object>();

                // 1. rasiedTime
                long raisedTime = Long.valueOf(String.valueOf(source.get(RAISEDTIME))) * 1000;
                values.add(raisedTime);

                // 2. value
                Object valueObj = source.get(VALUE);
                if (valueObj == null) {
                    // no value field, set it to 1 by default!
                    values.add("");
                } else {
                    values.add(valueObj);
                }

                result.add(values);
            }

            short currentMoves = restTemplate.getForObject(this.datarest + "/data/currentMoves?personId=" + personId, Short.class);
            List<Object> values = new ArrayList<Object>();
            values.add(System.currentTimeMillis());
            values.add(currentMoves);
            result.add(values);
        } catch (Exception e) {
            LOG.error("Failed to get values from elasticsearch, exception: " + e.getMessage());
        } finally {
            try { client.close(); } catch (IOException e) { }
            LOG.info(Utils.getLogPrefix(request) + "ended.");
        }

        return result;
    }

    @GetMapping(value = "/sleepInRange", produces = MediaType.APPLICATION_JSON)
    @ApiOperation(value = "Get sleep data for a specified person given the time range in seconds.")
    public List<Object> getSleepInRange(
            @ApiParam("Person Id") @RequestParam("personId") long personId,
            @ApiParam("startTime") @RequestParam("startTime") long startTime,
            @ApiParam("endTime") @RequestParam("endTime") long endTime,
            HttpServletRequest request) {
        LOG.info(Utils.getLogPrefix(request) + "started");

        ZuulFilters.validatePersonId(request, personId, "/api/elastic/sleepInRange");

        List<Object> result = new ArrayList<Object>();

        RestHighLevelClient client = this.getElasticClient();
        if (client == null) {
            LOG.error("Failed to get elasticsearch client.");
            return result;
        }

        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.from(0);
        sourceBuilder.size(10000);
        sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));

        QueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(RAISEDTIME).from(startTime).includeLower(true).to(endTime).includeUpper(true);
        QueryBuilder termQueryBuilderType = QueryBuilders.termQuery(TYPE, AlarmType.SLEEP.ordinal());
        QueryBuilder termQueryBuilderPersonId = QueryBuilders.termQuery(PERSON_ID, personId);
        QueryBuilder boolQueryBuilder = QueryBuilders.boolQuery().must(termQueryBuilderPersonId).must(termQueryBuilderType).must(rangeQueryBuilder);
        sourceBuilder.query(boolQueryBuilder);

        sourceBuilder.sort(new FieldSortBuilder(RAISEDTIME).order(SortOrder.ASC));

        String[] includeFields = new String[] {TYPE, RAISEDTIME, SLEEPHEAVY, SLEEPLIGHT};
        String[] excludeFields = new String[] {"_source", "_type"};
        sourceBuilder.fetchSource(includeFields, excludeFields);

        SearchRequest searchRequest = new SearchRequest(getIndex(personId, startTime, endTime));
        searchRequest.source(sourceBuilder);

        try {
            SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
            SearchHits hits = response.getHits();
            for (SearchHit hit : hits.getHits()) {
                Map<String, Object> source = hit.getSourceAsMap();
                List<Object> values = new ArrayList<Object>();

                // 1. rasiedTime
                long raisedTime = Long.valueOf(String.valueOf(source.get(RAISEDTIME))) * 1000;
                values.add(raisedTime);

                Object sleepHeavyObj = source.get(SLEEPHEAVY);
                values.add(sleepHeavyObj);

                Object sleepLightObj = source.get(SLEEPLIGHT);
                values.add(sleepLightObj);

                result.add(values);
            }
        } catch (Exception e) {
            LOG.error("Failed to get values from elasticsearch, exception: " + e.getMessage());
        } finally {
            try { client.close(); } catch (IOException e) { }
            LOG.info(Utils.getLogPrefix(request) + "ended.");
        }

        return result;
    }

    @SuppressWarnings({ "rawtypes", "unchecked" })
    @GetMapping(value = "/turnOverInRange", produces = MediaType.APPLICATION_JSON)
    @ApiOperation(value = "Get the turn over data for a specified person given the time range in seconds.")
    public List<Object> getTurnOverInRange(
            @ApiParam("Person Id") @RequestParam("personId") long personId,
            @ApiParam("startTime") @RequestParam("startTime") long startTime,
            @ApiParam("endTime") @RequestParam("endTime") long endTime,
            HttpServletRequest request) {
        LOG.info(Utils.getLogPrefix(request) + "started");

        ZuulFilters.validatePersonId(request, personId, "/api/elastic/trunOverInRange");

        List<Object> result = new ArrayList<Object>();

        RestHighLevelClient client = this.getElasticClient();
        if (client == null) {
            LOG.error("Failed to get elasticsearch client.");
            return result;
        }

        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.from(0);
        sourceBuilder.size(10000);
        sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));

        QueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(RAISEDTIME).from(startTime).includeLower(true).to(endTime).includeUpper(true);
        List terms = new ArrayList();
        terms.add(AlarmType.LEFT.ordinal());
        terms.add(AlarmType.RIGHT.ordinal());
        QueryBuilder termsQueryBuilderType = QueryBuilders.termsQuery(TYPE, terms);
        QueryBuilder termQueryBuilderPersonId = QueryBuilders.termQuery(PERSON_ID, personId);
        QueryBuilder boolQueryBuilder = QueryBuilders.boolQuery().must(termQueryBuilderPersonId).must(termsQueryBuilderType).must(rangeQueryBuilder);
        sourceBuilder.query(boolQueryBuilder);

        sourceBuilder.sort(new FieldSortBuilder(RAISEDTIME).order(SortOrder.ASC));

        String[] includeFields = new String[] {TYPE, RAISEDTIME};
        String[] excludeFields = new String[] {"_source", "_type"};
        sourceBuilder.fetchSource(includeFields, excludeFields);

        SearchRequest searchRequest = new SearchRequest(getIndex(personId, startTime, endTime));
        searchRequest.source(sourceBuilder);

        try {
            SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
            SearchHits hits = response.getHits();
            for (SearchHit hit : hits.getHits()) {
                Map<String, Object> source = hit.getSourceAsMap();
                Object[] values = new Object[2];
                values[0] = Long.valueOf(String.valueOf(source.get(RAISEDTIME))) * 1000;
                values[1] = Integer.valueOf(String.valueOf(source.get(TYPE))) == AlarmType.LEFT.ordinal() ? 1 : 2;
                result.add(values);
            }
        } catch (Exception e) {
            LOG.error("Failed to get values from elasticsearch, exception: " + e.getMessage());
        } finally {
            try { client.close(); } catch (IOException e) { }
            LOG.info(Utils.getLogPrefix(request) + "ended.");
        }

        return result;
    }

    private RestHighLevelClient getElasticClient() {
        return new RestHighLevelClient(RestClient.builder(new HttpHost("localhost", 9200, "http")));
    }
}
